﻿using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Nito.AsyncEx;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using CK.Sprite.RabbitMQ.Core;

namespace CK.Sprite.RabbitMQ.Service
{
    public abstract class CommonQueue<T> : ICommonQueue
    {
        private const string ChannelPrefix = "CommonQueue.";
        protected IChannelAccessor ChannelAccessor { get; private set; }
        protected EventingBasicConsumer Consumer { get; private set; }

        public ILogger<CommonQueue<T>> Logger { get; set; }
        protected IChannelPool ChannelPool { get; }
        protected IRabbitMqSerializer Serializer { get; }
        protected IServiceScopeFactory ServiceScopeFactory { get; }

        protected SemaphoreSlim SyncObj = new SemaphoreSlim(1, 1);
        protected bool IsDiposed { get; private set; }
        protected CommonRabbitMqOptions CommonRabbitMqOptions { get; set; }
        protected CommonQueueConfiguration CommonQueueConfiguration { get; set; }

        public CommonQueue(
            IChannelPool channelPool,
            IRabbitMqSerializer serializer,
            IOptions<CommonRabbitMqOptions> commonRabbitMqOptions,
            IServiceScopeFactory serviceScopeFactory, ILogger<CommonQueue<T>> logger)
        {
            Serializer = serializer;
            ServiceScopeFactory = serviceScopeFactory;
            ChannelPool = channelPool;

            CommonRabbitMqOptions = commonRabbitMqOptions.Value;

            CommonQueueConfiguration = GetOrCreateCommonQueueConfiguration();

            Logger = logger;
        }

        #region 静态方法

        /// <summary>
        /// 队列名称
        /// </summary>
        public abstract string QueueName { get; }

        /// <summary>
        /// 执行方法
        /// </summary>
        /// <param name="objReceive">消息对象</param>
        /// <returns></returns>
        public abstract Task ExecuteMessage(T objReceive);

        /// <summary>
        /// 是否是消费者
        /// </summary>
        public abstract bool IsConsumer { get; }

        #endregion

        protected virtual CommonQueueConfiguration GetOrCreateCommonQueueConfiguration()
        {
            return CommonRabbitMqOptions.CommonQueueConfigs.GetOrDefault(QueueName);
        }

        public virtual async Task<string> EnqueueAsync(
            object args,
            byte priority = 5,
            TimeSpan? delay = null)
        {
            CheckDisposed();

            using (await SyncObj.LockAsync())
            {
                await EnsureInitializedAsync();

                await PublishAsync(args, priority, delay);

                return null;
            }
        }

        public virtual async Task StartAsync(CancellationToken cancellationToken = default)
        {
            CheckDisposed();

            using (await SyncObj.LockAsync())
            {
                await EnsureInitializedAsync();
            }
        }

        public virtual Task StopAsync(CancellationToken cancellationToken = default)
        {
            Dispose();
            return Task.CompletedTask;
        }

        public virtual void Dispose()
        {
            if (IsDiposed)
            {
                return;
            }

            IsDiposed = true;

            ChannelAccessor?.Dispose();
        }

        protected virtual Task EnsureInitializedAsync()
        {
            if (ChannelAccessor != null)
            {
                return Task.CompletedTask;
            }

            ChannelAccessor = ChannelPool.Acquire(
                ChannelPrefix + CommonQueueConfiguration.QueueName,
                CommonQueueConfiguration.ConnectionName
            );

            var result = CommonQueueConfiguration.Declare(ChannelAccessor.Channel);
            Logger.LogDebug($"RabbitMQ Queue '{CommonQueueConfiguration.QueueName}' has {result.MessageCount} messages and {result.ConsumerCount} consumers.");

            if (IsConsumer)
            {
                Consumer = new EventingBasicConsumer(ChannelAccessor.Channel);
                Consumer.Received += MessageReceived;

                //TODO: What BasicConsume returns?
                ChannelAccessor.Channel.BasicConsume(
                    queue: CommonQueueConfiguration.QueueName,
                    autoAck: false,
                    consumer: Consumer
                );
            }

            return Task.CompletedTask;
        }

        protected virtual Task PublishAsync(
            object args,
            byte priority = 5,
            TimeSpan? delay = null)
        {
            //TODO: How to handle priority & delay?
            Logger.LogWarning($"publish queue,{QueueName},{JsonConvert.SerializeObject(args)}");

            ChannelAccessor.Channel.BasicPublish(
                exchange: "",
                routingKey: CommonQueueConfiguration.QueueName,
                basicProperties: CreateBasicPropertiesToPublish(priority),
                body: Serializer.Serialize(args)
            );

            Logger.LogWarning($"end publish queue,{QueueName},{JsonConvert.SerializeObject(args)}");
            return Task.CompletedTask;
        }

        protected virtual IBasicProperties CreateBasicPropertiesToPublish(byte priority)
        {
            var properties = ChannelAccessor.Channel.CreateBasicProperties();
            properties.Priority = priority;
            properties.Persistent = true;
            return properties;
        }

        protected virtual void MessageReceived(object sender, BasicDeliverEventArgs ea)
        {
            using (var scope = ServiceScopeFactory.CreateScope())
            {
                try
                {
                    AsyncHelper.RunSync(() => ExecuteMessage(Serializer.Deserialize<T>(ea.Body)));
                    ChannelAccessor.Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                }
                catch (CommonQueueException ex)
                {
                    //TODO: Reject like that?
                    Logger.LogError(ex, $"CommonQueueException,{QueueName},{JsonConvert.SerializeObject(Serializer.Deserialize<T>(ea.Body))},{ex.Message}");
                    ChannelAccessor.Channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
                }
                catch (Exception ex)
                {
                    //TODO: Reject like that?
                    Logger.LogError(ex, $"QueueException,{QueueName},{JsonConvert.SerializeObject(Serializer.Deserialize<T>(ea.Body))},{ex.Message}");
                    ChannelAccessor.Channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
                }
            }
        }

        protected void CheckDisposed()
        {
            if (IsDiposed)
            {
                throw new Exception("This object is disposed!");
            }
        }
    }
}